package shujia;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Demo5MRReadHBase {
    /**
     * MapReduce读取HBase student表
     * 统计班级人数，并将最后的结果保存到HDFS
     */
    public static class ReadHBaseMapper extends TableMapper<Text, IntWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // rowkey
            String id = Bytes.toString(key.get());

            String clazz = Bytes.toString(value.getValue("info".getBytes(), "clazz".getBytes()));

            context.write(new Text(clazz), new IntWritable(1));
        }
    }

    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // 统计班级人数
            int sum = 0;
            for (IntWritable v : values) {
                sum += v.get();
            }
            context.write(key, new IntWritable(sum));

        }
    }

    // Driver端
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master:2181,node1:2181,node2:2181");

        Job job = Job.getInstance(conf);
        job.setJobName("Demo5MRReadHBase");
        job.setJarByClass(Demo5MRReadHBase.class);

        // 配置Map
        TableMapReduceUtil.initTableMapperJob(
                TableName.valueOf("student"),
                new Scan(),
                ReadHBaseMapper.class,
                Text.class,
                IntWritable.class,
                job
        );


        // 配置Reduce
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setReducerClass(MyReducer.class);

        // 配置输入输出路径
        Path outputPath = new Path("/hbaseMR/output1");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        FileOutputFormat.setOutputPath(job, outputPath);


        job.waitForCompletion(true);

        /**
         * 将代码打成Jar包并上传至虚拟机
         * hadoop jar HBase-1.0-jar-with-dependencies.jar com.shujia.Demo5MRReadHBase
         */


    }

}
